{
  "cells": [
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "ur8xi4C7S06n"
      },
      "outputs": [],
      "source": [
        "# Copyright 2021 Google LLC\n",
        "#\n",
        "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
        "# you may not use this file except in compliance with the License.\n",
        "# You may obtain a copy of the License at\n",
        "#\n",
        "#     https://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing, software\n",
        "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
        "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
        "# See the License for the specific language governing permissions and\n",
        "# limitations under the License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "JAPoU8Sm5E6e"
      },
      "source": [
        "# Distributed Training with Reduction Server"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "tvgnzT1CKxrO"
      },
      "source": [
        "## Overview\n",
        "\n",
        "This notebook demonstrates how to optimize large distributed training jobs with the Vertex Training Reduction Server. The [TensorFlow NLP Modelling Toolkit](https://github.com/tensorflow/models/tree/master/official/nlp#tensorflow-nlp-modelling-toolkit) from the [TensorFlow Model Garden](https://github.com/tensorflow/models) is used to preprocess the data and create the model and training loop. `MultiWorkerMirroredStrategy` from the `tf.distribute` module is used to distribute model training across multiple machines. \n",
        "\n",
        "### Dataset\n",
        "\n",
        "The dataset used for this tutorial is the [Multi-Genre Natural Language Inference Corpus (MNLI)](https://www.tensorflow.org/datasets/catalog/glue#gluemnli) from the GLUE benchmark. A preprocessed version of this dataset is loaded from a Cloud Storage bucket, and used to fine tune a BERT model for sentence prediction.\n",
        "\n",
        "\n",
        "### Objective\n",
        "\n",
        "In this notebook, you create a custom-trained model from a Python script in a Docker container. You learn how to configure, submit, and monitor a Vertex Training job that uses Reduction Server to optimize network bandwith and latency of the gradient reduction operation in distributed training.  \n",
        "\n",
        "The steps performed include:\n",
        "\n",
        "- Create a Vertex Custom Training job that uses Reduction Server.\n",
        "- Submit and monitor the job.\n",
        "- Cleanup resources.\n",
        "\n",
        "### Costs \n",
        "\n",
        "\n",
        "This tutorial uses billable components of Google Cloud:\n",
        "\n",
        "* Vertex AI\n",
        "* Cloud Storage\n",
        "\n",
        "\n",
        "Learn about [Vertex AI\n",
        "pricing](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage\n",
        "pricing](https://cloud.google.com/storage/pricing), and use the [Pricing\n",
        "Calculator](https://cloud.google.com/products/calculator/)\n",
        "to generate a cost estimate based on your projected usage."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ze4-nDLfK4pw"
      },
      "source": [
        "### Set up your local development environment\n",
        "\n",
        "**If you are using Colab or Google Cloud Notebooks**, your environment already meets\n",
        "all the requirements to run this notebook. You can skip this step."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "gCuSR8GkAgzl"
      },
      "source": [
        "**Otherwise**, make sure your environment meets this notebook's requirements.\n",
        "You need the following:\n",
        "\n",
        "* The Google Cloud SDK\n",
        "* Git\n",
        "* Python 3\n",
        "* virtualenv\n",
        "* Jupyter notebook running in a virtual environment with Python 3\n",
        "\n",
        "The Google Cloud guide to [Setting up a Python development\n",
        "environment](https://cloud.google.com/python/setup) and the [Jupyter\n",
        "installation guide](https://jupyter.org/install) provide detailed instructions\n",
        "for meeting these requirements. The following steps provide a condensed set of\n",
        "instructions:\n",
        "\n",
        "1. [Install and initialize the Cloud SDK.](https://cloud.google.com/sdk/docs/)\n",
        "\n",
        "1. [Install Python 3.](https://cloud.google.com/python/setup#installing_python)\n",
        "\n",
        "1. [Install\n",
        "   virtualenv](https://cloud.google.com/python/setup#installing_and_using_virtualenv)\n",
        "   and create a virtual environment that uses Python 3. Activate the virtual environment.\n",
        "\n",
        "1. To install Jupyter, run `pip3 install jupyter` on the\n",
        "command-line in a terminal shell.\n",
        "\n",
        "1. To launch Jupyter, run `jupyter notebook` on the command-line in a terminal shell.\n",
        "\n",
        "1. Open this notebook in the Jupyter Notebook Dashboard."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "i7EUnXsZhAGF"
      },
      "source": [
        "### Install the required packages"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "2b4ef9b72d43"
      },
      "outputs": [],
      "source": [
        "import os\n",
        "\n",
        "# The Google Cloud Notebook product has specific requirements\n",
        "IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists(\"/opt/deeplearning/metadata/env_version\")\n",
        "\n",
        "# Google Cloud Notebook requires dependencies to be installed with '--user'\n",
        "USER_FLAG = \"\"\n",
        "if IS_GOOGLE_CLOUD_NOTEBOOK:\n",
        "    USER_FLAG = \"--user\""
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "c04a3abc06e8"
      },
      "source": [
        "Install the latest version of the Vertex SDK for Python."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "wyy5Lbnzg5fi"
      },
      "outputs": [],
      "source": [
        "! pip3 install --upgrade google-cloud-aiplatform $USER_FLAG"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "3c1b8d6e0eaa"
      },
      "source": [
        "Install the latest version of the Google Cloud Storage library."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "c491dc218314"
      },
      "outputs": [],
      "source": [
        "! pip3 install --upgrade google-cloud-storage $USER_FLAG"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "hhq5zEbGg0XX"
      },
      "source": [
        "### Restart the kernel\n",
        "\n",
        "After you install the additional packages, you need to restart the notebook kernel so it can find the packages."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "EzrelQZ22IZj"
      },
      "outputs": [],
      "source": [
        "# Automatically restart kernel after installs\n",
        "import os\n",
        "\n",
        "if not os.getenv(\"IS_TESTING\"):\n",
        "    # Automatically restart kernel after installs\n",
        "    import IPython\n",
        "\n",
        "    app = IPython.Application.instance()\n",
        "    app.kernel.do_shutdown(True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "lWEdiXsJg0XY"
      },
      "source": [
        "## Before you begin\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "BF1j6f9HApxa"
      },
      "source": [
        "### Set up your Google Cloud project\n",
        "\n",
        "**The following steps are required, regardless of your notebook environment.**\n",
        "\n",
        "1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.\n",
        "\n",
        "1. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).\n",
        "\n",
        "1. [Enable the Vertex AI API and Compute Engine API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com,compute_component).\n",
        "\n",
        "1. If you are running this notebook locally, you will need to install the [Cloud SDK](https://cloud.google.com/sdk).\n",
        "\n",
        "1. Enter your project ID in the cell below. Then run the cell to make sure the\n",
        "Cloud SDK uses the right project for all the commands in this notebook.\n",
        "\n",
        "**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$` into these commands."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "WReHDGG5g0XY"
      },
      "source": [
        "#### Set your project ID\n",
        "\n",
        "**If you don't know your project ID**, you may be able to get your project ID using `gcloud`."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "oM1iC_MfAts1"
      },
      "outputs": [],
      "source": [
        "import os\n",
        "\n",
        "PROJECT_ID = \"\"\n",
        "\n",
        "# Get your Google Cloud project ID from gcloud\n",
        "if not os.getenv(\"IS_TESTING\"):\n",
        "    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null\n",
        "    PROJECT_ID = shell_output[0]\n",
        "    print(\"Project ID: \", PROJECT_ID)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "qJYoRfYng0XZ"
      },
      "source": [
        "Otherwise, set your project ID here."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "riG_qUokg0XZ"
      },
      "outputs": [],
      "source": [
        "if PROJECT_ID == \"\" or PROJECT_ID is None:\n",
        "    PROJECT_ID = \"[your-project-id]\"  # @param {type:\"string\"}\n",
        "! gcloud config set project {PROJECT_ID}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "dr--iN2kAylZ"
      },
      "source": [
        "### Authenticate your Google Cloud account\n",
        "\n",
        "**If you are using Google Cloud Notebooks**, your environment is already\n",
        "authenticated. Skip this step."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "sBCra4QMA2wR"
      },
      "source": [
        "**If you are using Colab**, run the cell below and follow the instructions\n",
        "when prompted to authenticate your account via oAuth.\n",
        "\n",
        "**Otherwise**, follow these steps:\n",
        "\n",
        "1. In the Cloud Console, go to the [**Create service account key**\n",
        "   page](https://console.cloud.google.com/apis/credentials/serviceaccountkey).\n",
        "\n",
        "2. Click **Create service account**.\n",
        "\n",
        "3. In the **Service account name** field, enter a name, and\n",
        "   click **Create**.\n",
        "\n",
        "4. In the **Grant this service account access to project** section, click the **Role** drop-down list. Type \"Vertex AI\"\n",
        "into the filter box, and select\n",
        "   **Vertex AI Administrator**. Type \"Storage Object Admin\" into the filter box, and select **Storage Object Admin**.\n",
        "\n",
        "5. Click *Create*. A JSON file that contains your key downloads to your\n",
        "local environment.\n",
        "\n",
        "6. Enter the path to your service account key as the\n",
        "`GOOGLE_APPLICATION_CREDENTIALS` variable in the cell below and run the cell."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "PyQmSRbKA8r-"
      },
      "outputs": [],
      "source": [
        "import os\n",
        "import sys\n",
        "\n",
        "# If you are running this notebook in Colab, run this cell and follow the\n",
        "# instructions to authenticate your GCP account. This provides access to your\n",
        "# Cloud Storage bucket and lets you submit training jobs and prediction\n",
        "# requests.\n",
        "\n",
        "# The Google Cloud Notebook product has specific requirements\n",
        "IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists(\"/opt/deeplearning/metadata/env_version\")\n",
        "\n",
        "# If on Google Cloud Notebooks, then don't execute this code\n",
        "if not IS_GOOGLE_CLOUD_NOTEBOOK:\n",
        "    if \"google.colab\" in sys.modules:\n",
        "        from google.colab import auth as google_auth\n",
        "\n",
        "        google_auth.authenticate_user()\n",
        "\n",
        "    # If you are running this notebook locally, replace the string below with the\n",
        "    # path to your service account key and run this cell to authenticate your GCP\n",
        "    # account.\n",
        "    elif not os.getenv(\"IS_TESTING\"):\n",
        "        %env GOOGLE_APPLICATION_CREDENTIALS ''"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "zgPO1eR3CYjk"
      },
      "source": [
        "### Create a Cloud Storage bucket\n",
        "\n",
        "**The following steps are required, regardless of your notebook environment.**\n",
        "\n",
        "\n",
        "In this example, your training application uses Cloud Storage for accessing training and validation datasets and for storing checkpoints. \n",
        "\n",
        "Set the name of your Cloud Storage bucket below. It must be unique across all\n",
        "Cloud Storage buckets.\n",
        "\n",
        "You may also change the `REGION` variable, which is used for operations\n",
        "throughout the rest of this notebook. Make sure to [choose a region where Vertex AI services are\n",
        "available](https://cloud.google.com/vertex-ai/docs/general/locations#available_regions). You may\n",
        "not use a Multi-Regional Storage bucket for training with Vertex AI."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "MzGDU7TWdts_"
      },
      "outputs": [],
      "source": [
        "BUCKET_NAME = \"[your-bucket-name]\"  # @param {type:\"string\"}\n",
        "REGION = \"[your-region]\"  # @param {type:\"string\"}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "-EcIXiGsCePi"
      },
      "source": [
        "**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "NIq7R4HZCfIc"
      },
      "outputs": [],
      "source": [
        "! gsutil mb -l $REGION $BUCKET_NAME"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ucvCsknMCims"
      },
      "source": [
        "Finally, validate access to your Cloud Storage bucket by examining its contents:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "vhOb7YnwClBb"
      },
      "outputs": [],
      "source": [
        "! gsutil ls -al $BUCKET_NAME"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "XoEqT2Y4DJmf"
      },
      "source": [
        "### Import libraries and define constants"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "pRUOFELefqf1"
      },
      "outputs": [],
      "source": [
        "import time\n",
        "\n",
        "from google.cloud import aiplatform"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "247269093fb4"
      },
      "source": [
        "### Initialize Vertex SDK"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "27570c702690"
      },
      "outputs": [],
      "source": [
        "aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_NAME)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "c07485571228"
      },
      "source": [
        "### Set dataset location"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "345476813f4c"
      },
      "source": [
        "The Multi-Genre Natural Language Inference Corpus (MNLI) dataset has been preprocessed to a format required by the TensorFlow NLP Modelling Toolkit and uploaded to a public Cloud Storage bucket."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "5fd00263570c"
      },
      "outputs": [],
      "source": [
        "DATASET_LOCATION = \"gs://cloud-samples-data/vertex-ai/community-content/datasets/MNLI\"\n",
        "TRAIN_FILE = f\"{DATASET_LOCATION}/mnli_train.tf_record\"\n",
        "EVAL_FILE = f\"{DATASET_LOCATION}/mnli_valid.tf_record\"\n",
        "METADATA_FILE = f\"{DATASET_LOCATION}/metadata.json\""
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "bbaf7f8a792c"
      },
      "outputs": [],
      "source": [
        "# List the files\n",
        "\n",
        "! gsutil ls {DATASET_LOCATION}"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "8ea3b9ed322d"
      },
      "outputs": [],
      "source": [
        "# Examine dataset metadata\n",
        "\n",
        "! gsutil cat {METADATA_FILE}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "dd9941a9c5d5"
      },
      "source": [
        "### Create a training container\n",
        "#### Write Dockerfile\n",
        "\n",
        "The first step in containerizing your code is to create a Dockerfile. In the Dockerfile, you'll include all the commands needed to run the image such as installing the necessary libraries and setting up the entry point for the training code.\n",
        "\n",
        "This Dockerfile uses the Vertex pre-built TensorFlow 2.5 GPU Docker image as a base image. After downloading that image, this Dockerfile installs the TensorFlow Official Models and TensorFlow Text libraries. The Reduction Server NCCL plugin is pre-installed in the base image."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "9c5378a292a0"
      },
      "outputs": [],
      "source": [
        "# Create training image directory\n",
        "\n",
        "! mkdir training_image"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "94530ed2c913"
      },
      "outputs": [],
      "source": [
        "%%writefile training_image/Dockerfile\n",
        "\n",
        "FROM us-docker.pkg.dev/vertex-ai/training/tf-gpu.2-5:latest\n",
        "    \n",
        "WORKDIR /\n",
        "\n",
        "# Installs Official Models and Text libraries\n",
        "RUN pip install tf-models-official==2.5.1 tensorflow-text==2.5.0\n",
        "\n",
        "\n",
        "# Copies the trainer code to the docker image.\n",
        "COPY trainer /trainer"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "769d97f5bb46"
      },
      "source": [
        "#### Create training application code\n",
        "\n",
        "Next, you create a `training_image/trainer` directory with a `train.py` script that contains the code for your training application.\n",
        "\n",
        "The `train.py` training script is based on [the common training driver](https://github.com/tensorflow/models/blob/master/official/nlp/docs/train.md) from the TensorFlow NLP Modelling Toolkit. The common training driver script supports multiple NLP tasks (e.g., pre-training, GLUE and SQuAD fine-tuning) and multiple models (e.g., BERT, ALBERT). \n",
        "\n",
        "A set of configurations for a specific NLP task is called an experiment.  The toolkit includes a set of pre-defined experiments. When you invoke the script, you specificy an experiment type using the `--experiment` command line argument. There are two options for overriding the default experiment configuration:\n",
        "- Specify one or multiple YAML configurations with updated settings using the `--config_file` command line argument\n",
        "- Provide updated settings as a list of key/value pairs through the `--params_override` command line argument\n",
        "\n",
        "If you specify both `--config_file` and `--params_override`, the settings in `--params_override` take precedence.\n",
        "\n",
        "Retrieving the default experiment configuration and merging user provided settings is encapsulated in the `official.core.train_utils.parse_configuration` utility function from the TensorFlow Model Garden.\n",
        "\n",
        "In the following cell, the [common training driver script](https://github.com/tensorflow/models/blob/master/official/nlp/train.py)  has been adapted to work seamlessly on a distributed compute environment provisioned when running a Vertex Training job. The TensorFlow NLP Modelling Toolkit uses [Orbit](https://github.com/tensorflow/models/tree/master/orbit) to implement a custom training loop. The custom training loop saves checkpoints, writes Tensorboard summaries, and saves a trained model to a storage location specified through the `--model-dir` command line argument. \n",
        "\n",
        "Note that when using the base common training driver in a distributed setting, each worker uses the same base storage location. To avoid conflicts when multiple workers write to the same storage location, the driver code has been modified so that each worker uses a different storage location based on its role in a Vertex compute cluster. This logic is implemented in the `_get_model_dir` utility function.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "24a2466976fb"
      },
      "outputs": [],
      "source": [
        "# Create trainer directory\n",
        "\n",
        "! mkdir training_image/trainer"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "ac9bbcec38fd"
      },
      "outputs": [],
      "source": [
        "%%writefile training_image/trainer/train.py\n",
        "\n",
        "# Copyright 2021 The TensorFlow Authors. All Rights Reserved.\n",
        "#\n",
        "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
        "# you may not use this file except in compliance with the License.\n",
        "# You may obtain a copy of the License at\n",
        "#\n",
        "#     http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing, software\n",
        "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
        "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
        "# See the License for the specific language governing permissions and\n",
        "# limitations under the License.\n",
        "\n",
        "\"\"\"TFM common training driver.\"\"\"\n",
        "\n",
        "import json\n",
        "import os\n",
        "\n",
        "from absl import app\n",
        "from absl import flags\n",
        "from absl import logging\n",
        "import gin\n",
        "from official.common import distribute_utils\n",
        "from official.common import flags as tfm_flags\n",
        "from official.common import registry_imports\n",
        "from official.core import task_factory\n",
        "from official.core import train_lib\n",
        "from official.core import train_utils\n",
        "from official.modeling import performance\n",
        "\n",
        "FLAGS = flags.FLAGS\n",
        "\n",
        "\n",
        "def _get_model_dir(model_dir):\n",
        "  \"\"\"Defines utility functions for model saving.\n",
        "\n",
        "  In a multi-worker scenario, the chief worker will save to the\n",
        "  desired model directory, while the other workers will save the model to\n",
        "  temporary directories. It’s important that these temporary directories\n",
        "  are unique in order to prevent multiple workers from writing to the same\n",
        "  location. Saving can contain collective ops, so all workers must save and\n",
        "  not just the chief.\n",
        "  \"\"\"\n",
        "\n",
        "  def _is_chief(task_type, task_id):\n",
        "    return ((task_type == 'chief' and task_id == 0) or task_type is None)\n",
        "\n",
        "  tf_config = os.getenv('TF_CONFIG')\n",
        "  if tf_config:\n",
        "    tf_config = json.loads(tf_config)\n",
        "\n",
        "    if not _is_chief(tf_config['task']['type'], tf_config['task']['index']):\n",
        "      model_dir = os.path.join(model_dir,\n",
        "                               'worker-{}').format(tf_config['task']['index'])\n",
        "\n",
        "  logging.info('Setting model_dir to: %s', model_dir)\n",
        "\n",
        "  return model_dir\n",
        "\n",
        "\n",
        "def main(_):\n",
        "\n",
        "  model_dir = _get_model_dir(FLAGS.model_dir)\n",
        "\n",
        "  gin.parse_config_files_and_bindings(FLAGS.gin_file, FLAGS.gin_params)\n",
        "  params = train_utils.parse_configuration(FLAGS)\n",
        "\n",
        "  if 'train' in FLAGS.mode:\n",
        "    # Pure eval modes do not output yaml files. Otherwise continuous eval job\n",
        "    # may race against the train job for writing the same file.\n",
        "    train_utils.serialize_config(params, model_dir)\n",
        "\n",
        "  # Sets mixed_precision policy. Using 'mixed_float16' or 'mixed_bfloat16'\n",
        "  # can have significant impact on model speeds by utilizing float16 in case of\n",
        "  # GPUs, and bfloat16 in the case of TPUs. loss_scale takes effect only when\n",
        "  # dtype is float16\n",
        "  if params.runtime.mixed_precision_dtype:\n",
        "    performance.set_mixed_precision_policy(params.runtime.mixed_precision_dtype)\n",
        "  distribution_strategy = distribute_utils.get_distribution_strategy(\n",
        "      distribution_strategy=params.runtime.distribution_strategy,\n",
        "      all_reduce_alg=params.runtime.all_reduce_alg,\n",
        "      num_gpus=params.runtime.num_gpus,\n",
        "      tpu_address=params.runtime.tpu,\n",
        "      **params.runtime.model_parallelism())\n",
        "  with distribution_strategy.scope():\n",
        "    task = task_factory.get_task(params.task, logging_dir=model_dir)\n",
        "\n",
        "  train_lib.run_experiment(\n",
        "      distribution_strategy=distribution_strategy,\n",
        "      task=task,\n",
        "      mode=FLAGS.mode,\n",
        "      params=params,\n",
        "      model_dir=model_dir)\n",
        "\n",
        "  train_utils.save_gin_config(FLAGS.mode, model_dir)\n",
        "\n",
        "\n",
        "if __name__ == '__main__':\n",
        "  tfm_flags.define_flags()\n",
        "  app.run(main)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "a481e8683121"
      },
      "source": [
        "####  Create base settings for the MNLI fine tuning experiment\n",
        "\n",
        "The TensorFlow NLP Modelling Toolkit includes a predefined experiment for a set of text classification tasks, the `bert/sentence_prediction` experiment. The base settings in the `bert/sentence_prediction` experiment need to be updated for the MNLI fine tuning task you perform in this example.\n",
        "\n",
        "In the next cell, you update the settings by creating a YAML configuration file that will be referenced when invoking a training script. As noted earlier, you can fine tune these settings even further for each training run by using the `--params_override` flag.\n",
        "\n",
        "The configuration file has three sections: `task`, `trainer`, and `runtime`.\n",
        "* The `task` section contains settings specific to your machine learning task, including a URI to a pre-trained BERT model,  training and validation datasets settings, and evaluation metrics. \n",
        "* The `trainer` section configures the settings that control the custom training loop, like checkpoint interval or a number of training steps. \n",
        "* The `runtime` section includes the settings for a training runtime: a distributed training strategy, GPU configurations, etc."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "d66c492b923c"
      },
      "outputs": [],
      "source": [
        "%%writefile training_image/trainer/glue_mnli_matched.yaml\n",
        "\n",
        "task:\n",
        "  hub_module_url: 'https://tfhub.dev/tensorflow/bert_en_uncased_L-24_H-1024_A-16/4'\n",
        "  model:\n",
        "    num_classes: 3\n",
        "  init_checkpoint: ''\n",
        "  metric_type: 'accuracy'\n",
        "  train_data:\n",
        "    drop_remainder: true\n",
        "    global_batch_size: 32\n",
        "    input_path: ''\n",
        "    is_training: true\n",
        "    seq_length: 128\n",
        "    label_type: 'int'\n",
        "  validation_data:\n",
        "    drop_remainder: false\n",
        "    global_batch_size: 32\n",
        "    input_path: ''\n",
        "    is_training: false\n",
        "    seq_length: 128\n",
        "    label_type: 'int'\n",
        "trainer:\n",
        "  checkpoint_interval: 3000\n",
        "  optimizer_config:\n",
        "    learning_rate:\n",
        "      polynomial:\n",
        "        # 100% of train_steps.\n",
        "        decay_steps: 36813\n",
        "        end_learning_rate: 0.0\n",
        "        initial_learning_rate: 3.0e-05\n",
        "        power: 1.0\n",
        "      type: polynomial\n",
        "    optimizer:\n",
        "      type: adamw\n",
        "    warmup:\n",
        "      polynomial:\n",
        "        power: 1\n",
        "        # ~10% of train_steps.\n",
        "        warmup_steps: 3681\n",
        "      type: polynomial\n",
        "  steps_per_loop: 1000\n",
        "  summary_interval: 1000\n",
        "  # Training data size 392,702 examples, 3 epochs.\n",
        "  train_steps: 36813\n",
        "  validation_interval: 6135\n",
        "  # Eval data size = 9815 examples.\n",
        "  validation_steps: 307\n",
        "  best_checkpoint_export_subdir: 'best_ckpt'\n",
        "  best_checkpoint_eval_metric: 'cls_accuracy'\n",
        "  best_checkpoint_metric_comp: 'higher'\n",
        "runtime:\n",
        "  distribution_strategy: 'multi_worker_mirrored'\n",
        "  all_reduce_alg: 'nccl'"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "22dae83d50c5"
      },
      "source": [
        "### Build the container\n",
        "\n",
        "In the next cells, you build the container and push it to Google Container Registry."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "9f7015f17eea"
      },
      "outputs": [],
      "source": [
        "TRAIN_IMAGE = f\"gcr.io/{PROJECT_ID}/mnli_finetuning\""
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "0247689ca75e"
      },
      "outputs": [],
      "source": [
        "! docker build -t {TRAIN_IMAGE} training_image"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "7887a8df1345"
      },
      "outputs": [],
      "source": [
        "! docker push {TRAIN_IMAGE}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "bc35446749b5"
      },
      "source": [
        "### Create a custom training job\n",
        "\n",
        "When you run a distributed training job with Vertex AI, you specify multiple machines (nodes) in a training cluster. The training service allocates the resources for the machine types you specify. Your running job on a given node is called a replica. A group of replicas with the same configuration is called a worker pool. Vertex Training provides 4 worker pools to cover the different types of machine tasks. To use the Reduction Server, you'll need to use 3 of the 4 available worker pools.\n",
        "\n",
        "* **Worker pool 0** configures the Primary, chief, scheduler, or \"master\".  This worker generally takes on some extra work such as saving checkpoints and writing summary files. There is only ever one chief worker in a cluster, so your worker count for worker pool 0 will always be 1.\n",
        "\n",
        "* **Worker pool 1** is where you configure the rest of the workers for your cluster. \n",
        " \n",
        "* **Worker pool 2** manages Reduction Server reducers. \n",
        "\n",
        "Worker pools 0 and 1 run the custom training container you created in the previous step. Worker pool 2 uses the Reduction Server image provided by Vertex AI.\n",
        "\n",
        "The helper function below creates worker pool specifications using the described worker pool topology."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "7a924769c205"
      },
      "outputs": [],
      "source": [
        "def prepare_worker_pool_specs(\n",
        "    image_uri,\n",
        "    args,\n",
        "    cmd,\n",
        "    replica_count=1,\n",
        "    machine_type=\"n1-standard-4\",\n",
        "    accelerator_count=0,\n",
        "    accelerator_type=\"ACCELERATOR_TYPE_UNSPECIFIED\",\n",
        "    reduction_server_count=0,\n",
        "    reduction_server_machine_type=\"n1-highcpu-16\",\n",
        "    reduction_server_image_uri=\"us-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latest\",\n",
        "):\n",
        "\n",
        "    if accelerator_count > 0:\n",
        "        machine_spec = {\n",
        "            \"machine_type\": machine_type,\n",
        "            \"accelerator_type\": accelerator_type,\n",
        "            \"accelerator_count\": accelerator_count,\n",
        "        }\n",
        "    else:\n",
        "        machine_spec = {\"machine_type\": machine_type}\n",
        "\n",
        "    container_spec = {\n",
        "        \"image_uri\": image_uri,\n",
        "        \"args\": args,\n",
        "        \"command\": cmd,\n",
        "    }\n",
        "\n",
        "    chief_spec = {\n",
        "        \"replica_count\": 1,\n",
        "        \"machine_spec\": machine_spec,\n",
        "        \"container_spec\": container_spec,\n",
        "    }\n",
        "\n",
        "    worker_pool_specs = [chief_spec]\n",
        "    if replica_count > 1:\n",
        "        workers_spec = {\n",
        "            \"replica_count\": replica_count - 1,\n",
        "            \"machine_spec\": machine_spec,\n",
        "            \"container_spec\": container_spec,\n",
        "        }\n",
        "        worker_pool_specs.append(workers_spec)\n",
        "\n",
        "    if reduction_server_count > 1:\n",
        "        workers_spec = {\n",
        "            \"replica_count\": reduction_server_count,\n",
        "            \"machine_spec\": {\n",
        "                \"machine_type\": reduction_server_machine_type,\n",
        "            },\n",
        "            \"container_spec\": {\"image_uri\": reduction_server_image_uri},\n",
        "        }\n",
        "        worker_pool_specs.append(workers_spec)\n",
        "\n",
        "    return worker_pool_specs"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "a753112e1484"
      },
      "source": [
        "#### Configure worker pools\n",
        "\n",
        "When choosing the number and type of reducers, you should consider the network bandwidth supported by a reducer replica’s machine type. In GCP, a VM’s machine type defines its maximum possible egress bandwidth. For example, the egress bandwidth of the `n1-highcpu-16` machine type is limited at 32 Gbps.\n",
        "\n",
        "Because reducers perform a very limited function, aggregating blocks of gradients, they can run on relatively low-powered and cost effective machines. Even with a large number of gradients this computation does not require accelerated hardware or high CPU or memory resources. However, to avoid network bottlenecks, the total aggregate bandwidth of all replicas in the reducer worker pool must be greater or equal to the total aggregate bandwidth of all replicas in worker pools 0 and 1, which host the GPU workers."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "84c6a7f3b1bd"
      },
      "outputs": [],
      "source": [
        "REPLICA_COUNT = 2\n",
        "WORKER_MACHINE_TYPE = \"a2-highgpu-4g\"\n",
        "ACCELERATOR_TYPE = \"NVIDIA_TESLA_A100\"\n",
        "PER_MACHINE_ACCELERATOR_COUNT = 4\n",
        "PER_REPLICA_BATCH_SIZE = 32\n",
        "\n",
        "REDUCTION_SERVER_COUNT = 4\n",
        "REDUCTION_SERVER_MACHINE_TYPE = \"n1-highcpu-16\""
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "254ff1d16134"
      },
      "source": [
        "#### Fine tune the  MNLI experiment settings\n",
        "\n",
        "The default settings for the MNLI fine tuning experiment have been configured in the YAML configuration file created in the previous steps. To override the defaults for a specific training run, use the `--params_override` flag.\n",
        "\n",
        "`params_override` accepts a string with comma separated key/value pairs for each parameter to be overridden.\n",
        "\n",
        "In the next cell you update the following settings:\n",
        "\n",
        "- `trainer.train_step` - The number of training steps. \n",
        "- `trainer.steps_per_loop` - The training script prints out updates about training progress every `steps_per_loop`.\n",
        "- `trainer.summary_interval` - The training script logs Tensorboard summaries every `summary_interval`.\n",
        "- `trainer.validation_interval` - The training script runs validation every `validation_interval`.\n",
        "- `trainer.checkpoint_interval` - The training script creates a checkpoint every `checkpoint_interval`.\n",
        "- `task.train_data.global_batch_size` - Global batch size for training data.\n",
        "- `task.validation_data.global_batch_size` - Global batch size for validation data.\n",
        "- `task.train_data.input_path` - Location of the training dataset.\n",
        "- `task.validation_data.input_path` - Location of the validation dataset.\n",
        "- `runtime.num_gpus` -Number of GPUs to use on each worker."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "9f3c4229ce93"
      },
      "outputs": [],
      "source": [
        "PARAMS_OVERRIDE = \",\".join(\n",
        "    [\n",
        "        \"trainer.train_steps=2000\",\n",
        "        \"trainer.steps_per_loop=100\",\n",
        "        \"trainer.summary_interval=100\",\n",
        "        \"trainer.validation_interval=2000\",\n",
        "        \"trainer.checkpoint_interval=2000\",\n",
        "        \"task.train_data.global_batch_size=\"\n",
        "        + str(REPLICA_COUNT * PER_REPLICA_BATCH_SIZE * PER_MACHINE_ACCELERATOR_COUNT),\n",
        "        \"task.validation_data.global_batch_size=\"\n",
        "        + str(REPLICA_COUNT * PER_REPLICA_BATCH_SIZE * PER_MACHINE_ACCELERATOR_COUNT),\n",
        "        \"task.train_data.input_path=\" + TRAIN_FILE,\n",
        "        \"task.validation_data.input_path=\" + EVAL_FILE,\n",
        "        \"runtime.num_gpus=\" + str(PER_MACHINE_ACCELERATOR_COUNT),\n",
        "    ]\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "d1b92d8c86d8"
      },
      "source": [
        "### Submit and monitor the job\n",
        "\n",
        "After the experimentation and worker pool configuration parameters have been defined, use the Vertex SDK for Python to submit and monitor a training job."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "5dd69814c1d5"
      },
      "outputs": [],
      "source": [
        "JOB_NAME = \"MNLI_{}\".format(time.strftime(\"%Y%m%d_%H%M%S\"))\n",
        "MODEL_DIR = f\"{BUCKET_NAME}/{JOB_NAME}/model\"\n",
        "\n",
        "WORKER_CMD = [\"python\", \"trainer/train.py\"]\n",
        "WORKER_ARGS = [\n",
        "    \"--experiment=bert/sentence_prediction\",\n",
        "    \"--mode=train\",\n",
        "    \"--model_dir=\" + MODEL_DIR,\n",
        "    \"--config_file=trainer/glue_mnli_matched.yaml\",\n",
        "    \"--params_override=\" + PARAMS_OVERRIDE,\n",
        "]\n",
        "\n",
        "worker_pool_specs = prepare_worker_pool_specs(\n",
        "    image_uri=TRAIN_IMAGE,\n",
        "    args=WORKER_ARGS,\n",
        "    cmd=WORKER_CMD,\n",
        "    replica_count=REPLICA_COUNT,\n",
        "    machine_type=WORKER_MACHINE_TYPE,\n",
        "    accelerator_count=PER_MACHINE_ACCELERATOR_COUNT,\n",
        "    accelerator_type=ACCELERATOR_TYPE,\n",
        "    reduction_server_count=REDUCTION_SERVER_COUNT,\n",
        "    reduction_server_machine_type=REDUCTION_SERVER_MACHINE_TYPE,\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "caeb74cdba20"
      },
      "outputs": [],
      "source": [
        "job = aiplatform.CustomJob(\n",
        "    display_name=JOB_NAME,\n",
        "    worker_pool_specs=worker_pool_specs,\n",
        ")\n",
        "job.run(sync=False)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "TpV-iwP9qw9c"
      },
      "source": [
        "## Cleaning up\n",
        "\n",
        "To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud\n",
        "project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.\n",
        "\n",
        "Otherwise, you can delete the individual resources you created in this tutorial:\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "sx_vKniMq9ZX"
      },
      "outputs": [],
      "source": [
        "# Delete Cloud Storage objects that were created\n",
        "! gsutil -m rm -r {BUCKET_NAME}\n",
        "\n",
        "# Delete the training job\n",
        "delete_training_job = True\n",
        "job.delete()"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "collapsed_sections": [],
      "name": "distributed-training-reduction-server.ipynb",
      "toc_visible": true
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
